/*-------------------------------------------------------------------------
 * tablesync.c
 *      PostgreSQL logical replication
 *
 * Copyright (c) 2012-2017, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *      src/backend/replication/logical/tablesync.c
 *
 * NOTES
 *      This file contains code for initial table data synchronization for
 *      logical replication.
 *
 *      The initial data synchronization is done separately for each table,
 *      in a separate apply worker that only fetches the initial snapshot data
 *      from the publisher and then synchronizes the position in the stream with
 *      the main apply worker.
 *
 *      There are several reasons for doing the synchronization this way:
 *       - It allows us to parallelize the initial data synchronization
 *         which lowers the time needed for it to happen.
 *       - The initial synchronization does not have to hold the xid and LSN
 *         for the time it takes to copy data of all tables, causing less
 *         bloat and lower disk consumption compared to doing the
 *         synchronization in a single process for the whole database.
 *       - It allows us to synchronize any tables added after the initial
 *         synchronization has finished.
 *
 *      The stream position synchronization works in multiple steps.
 *       - Sync finishes copy and sets worker state as SYNCWAIT and waits for
 *         state to change in a loop.
 *       - Apply periodically checks tables that are synchronizing for SYNCWAIT.
 *         When the desired state appears, it will set the worker state to
 *         CATCHUP and starts loop-waiting until either the table state is set
 *         to SYNCDONE or the sync worker exits.
 *       - After the sync worker has seen the state change to CATCHUP, it will
 *         read the stream and apply changes (acting like an apply worker) until
 *         it catches up to the specified stream position.  Then it sets the
 *         state to SYNCDONE.  There might be zero changes applied between
 *         CATCHUP and SYNCDONE, because the sync worker might be ahead of the
 *         apply worker.
 *       - Once the state was set to SYNCDONE, the apply will continue tracking
 *         the table until it reaches the SYNCDONE stream position, at which
 *         point it sets state to READY and stops tracking.  Again, there might
 *         be zero changes in between.
 *
 *      So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
 *      SYNCDONE -> READY.
 *
 *      The catalog pg_subscription_rel is used to keep information about
 *      subscribed tables and their state.  Some transient state during data
 *      synchronization is kept in shared memory.  The states SYNCWAIT and
 *      CATCHUP only appear in memory.
 *
 *      Example flows look like this:
 *       - Apply is in front:
 *          sync:8
 *            -> set in memory SYNCWAIT
 *          apply:10
 *            -> set in memory CATCHUP
 *            -> enter wait-loop
 *          sync:10
 *            -> set in catalog SYNCDONE
 *            -> exit
 *          apply:10
 *            -> exit wait-loop
 *            -> continue rep
 *          apply:11
 *            -> set in catalog READY
 *       - Sync in front:
 *          sync:10
 *            -> set in memory SYNCWAIT
 *          apply:8
 *            -> set in memory CATCHUP
 *            -> continue per-table filtering
 *          sync:10
 *            -> set in catalog SYNCDONE
 *            -> exit
 *          apply:10
 *            -> set in catalog READY
 *            -> stop per-table filtering
 *            -> continue rep
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "miscadmin.h"
#include "pgstat.h"

#include "access/xact.h"

#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"

#include "commands/copy.h"

#include "parser/parse_relation.h"

#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"

#include "utils/snapmgr.h"
#include "storage/ipc.h"

#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#ifdef __STORAGE_SCALABLE__
#include "replication/logicalrelation.h"
#include "replication/logical_statistic.h"
#endif

static bool table_states_valid = false;

StringInfo    copybuf = NULL;

/*
 * Exit routine for synchronization worker.
 */
static void
pg_attribute_noreturn()
finish_sync_worker(void)
{
    /*
     * Commit any outstanding transaction. This is the usual case, unless
     * there was nothing to do for the table.
     */
    if (IsTransactionState())
    {
        CommitTransactionCommand();
        pgstat_report_stat(false);
    }

    /* And flush all writes. */
    XLogFlush(GetXLogWriteRecPtr());

    StartTransactionCommand();
    ereport(LOG,
            (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
                    MySubscription->name,
                    get_rel_name(MyLogicalRepWorker->relid))));
    CommitTransactionCommand();

#ifdef __STORAGE_SCALABLE__
    if (IS_PGXC_DATANODE)
    {
        UpdateSubTableStatistics(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid,
             0, 0, 0, 0, 0, STATE_SYNCDONE, false);
    }
#endif

    /* Find the main apply worker and signal it. */
    logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);

    /* Stop gracefully */
    proc_exit(0);
}

/*
 * Wait until the relation synchronization state is set in the catalog to the
 * expected one.
 *
 * Used when transitioning from CATCHUP state to SYNCDONE.
 *
 * Returns false if the synchronization worker has disappeared or the table state
 * has been reset.
 */
static bool
wait_for_relation_state_change(Oid relid, char expected_state)
{
    int            rc;
    char        state;

    for (;;)
    {
        LogicalRepWorker *worker;
        XLogRecPtr    statelsn;

        CHECK_FOR_INTERRUPTS();

        /* XXX use cache invalidation here to improve performance? */
        PushActiveSnapshot(GetLatestSnapshot());
        state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                                        relid, &statelsn, true);
        PopActiveSnapshot();

        if (state == SUBREL_STATE_UNKNOWN)
            return false;

        if (state == expected_state)
            return true;

        /* Check if the sync worker is still running and bail if not. */
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);

        /* Check if the opposite worker is still running and bail if not. */
        worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                        am_tablesync_worker() ? InvalidOid : relid,
                                        false);
        LWLockRelease(LogicalRepWorkerLock);
        if (!worker)
            return false;

        rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                       1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);

        /* emergency bailout if postmaster has died */
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);

        ResetLatch(MyLatch);
    }

    return false;
}

/*
 * Wait until the apply worker changes the state of our synchronization
 * worker to the expected one.
 *
 * Used when transitioning from SYNCWAIT state to CATCHUP.
 *
 * Returns false if the apply worker has disappeared.
 */
static bool
wait_for_worker_state_change(char expected_state)
{
    int            rc;

    for (;;)
    {
        LogicalRepWorker *worker;

        CHECK_FOR_INTERRUPTS();

        /*
         * Done if already in correct state.  (We assume this fetch is atomic
         * enough to not give a misleading answer if we do it with no lock.)
         */
        if (MyLogicalRepWorker->relstate == expected_state)
            return true;

        /*
         * Bail out if the apply worker has died, else signal it we're
         * waiting.
         */
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
        worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                        InvalidOid, false);
        if (worker && worker->proc)
            logicalrep_worker_wakeup_ptr(worker);
        LWLockRelease(LogicalRepWorkerLock);
        if (!worker)
            break;

        /*
         * Wait.  We expect to get a latch signal back from the apply worker,
         * but use a timeout in case it dies without sending one.
         */
        rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
                       1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);

        /* emergency bailout if postmaster has died */
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);

        if (rc & WL_LATCH_SET)
            ResetLatch(MyLatch);
    }

    return false;
}

/*
 * Callback from syscache invalidation.
 */
void
invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
{
    table_states_valid = false;
}

/*
 * Handle table synchronization cooperation from the synchronization
 * worker.
 *
 * If the sync worker is in CATCHUP state and reached (or passed) the
 * predetermined synchronization point in the WAL stream, mark the table as
 * SYNCDONE and finish.
 */
static void
process_syncing_tables_for_sync(XLogRecPtr current_lsn)
{
    Assert(IsTransactionState());

#ifdef __SUBSCRIPTION__

	if (MySubscription->is_all_actived == true &&
				MySubscription->parallel_number > 1)
	{
		ListCell   *lc_simple;

		/*get parallel_index != 0 other sub_child worker's MyLogicalRepWorker*/
		List *parallel_sub_workers_list = GetTbaseSubscriptnParallelWorker(MySubscription->oid);

		foreach (lc_simple, parallel_sub_workers_list)
		{
			LogicalRepWorker *sub_worker = (LogicalRepWorker *) lfirst(lc_simple);
			SpinLockAcquire(&sub_worker->relmutex);
			if (current_lsn < sub_worker->reply_lsn)
			{
				/* must bigger than other parallel worker's lsn, else return */
				SpinLockRelease(&sub_worker->relmutex);
				return;
			}
			SpinLockRelease(&sub_worker->relmutex);
		}

 	   list_free(parallel_sub_workers_list);

	}

#endif

    SpinLockAcquire(&MyLogicalRepWorker->relmutex);

    if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
        current_lsn >= MyLogicalRepWorker->relstate_lsn)
    {
        TimeLineID    tli;

        MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
        MyLogicalRepWorker->relstate_lsn = current_lsn;

#ifdef __STORAGE_SCALABLE__
        /* update table statistics */
        logicalrep_statis_update_for_sync(MyLogicalRepWorker->relid, MyLogicalRepWorker->subid, MySubscription->name);
#endif

        SpinLockRelease(&MyLogicalRepWorker->relmutex);

        SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                MyLogicalRepWorker->relid,
                                MyLogicalRepWorker->relstate,
                                MyLogicalRepWorker->relstate_lsn,
                                true, false);

        walrcv_endstreaming(wrconn, &tli);
        finish_sync_worker();
    }
    else
        SpinLockRelease(&MyLogicalRepWorker->relmutex);
}

/*
 * Handle table synchronization cooperation from the apply worker.
 *
 * Walk over all subscription tables that are individually tracked by the
 * apply process (currently, all that have state other than
 * SUBREL_STATE_READY) and manage synchronization for them.
 *
 * If there are tables that need synchronizing and are not being synchronized
 * yet, start sync workers for them (if there are free slots for sync
 * workers).  To prevent starting the sync worker for the same relation at a
 * high frequency after a failure, we store its last start time with each sync
 * state info.  We start the sync worker for the same relation after waiting
 * at least wal_retrieve_retry_interval.
 *
 * For tables that are being synchronized already, check if sync workers
 * either need action from the apply worker or have finished.  This is the
 * SYNCWAIT to CATCHUP transition.
 *
 * If the synchronization position is reached (SYNCDONE), then the table can
 * be marked as READY and is no longer tracked.
 */
static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{// #lizard forgives
    struct tablesync_start_time_mapping
    {
        Oid            relid;
        TimestampTz last_start_time;
    };
    static List *table_states = NIL;
    static HTAB *last_start_times = NULL;
    ListCell   *lc;
    bool        started_tx = false;

    Assert(!IsTransactionState());

    /* We need up-to-date sync state info for subscription tables here. */
    if (!table_states_valid)
    {
        MemoryContext oldctx;
        List       *rstates;
        ListCell   *lc;
        SubscriptionRelState *rstate;

        /* Clean the old list. */
        list_free_deep(table_states);
        table_states = NIL;

        StartTransactionCommand();
        started_tx = true;

        /* Fetch all non-ready tables. */
        rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);

#ifdef __SUBSCRIPTION__
        if (MySubscription->is_all_actived == false &&
            MySubscription->parallel_number > 1 &&
            rstates == NIL &&
            MySubscription->parallel_index == 0)
         {
            /*
             * all copy is done, activate the other parallel tbase-sub-subscriptions directly
             */
            ActiveAllParallelTbaseSubscriptions(current_lsn);
            Assert(MySubscription->is_all_actived);
            if (MySubscription->is_all_actived == false) abort();
         }
#endif
        /* Allocate the tracking info in a permanent memory context. */
        oldctx = MemoryContextSwitchTo(CacheMemoryContext);
        foreach(lc, rstates)
        {
            rstate = palloc(sizeof(SubscriptionRelState));
            memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
            table_states = lappend(table_states, rstate);
        }
        MemoryContextSwitchTo(oldctx);

        table_states_valid = true;
    }

    /*
     * Prepare a hash table for tracking last start times of workers, to avoid
     * immediate restarts.  We don't need it if there are no tables that need
     * syncing.
     */
    if (table_states && !last_start_times)
    {
        HASHCTL        ctl;

        memset(&ctl, 0, sizeof(ctl));
        ctl.keysize = sizeof(Oid);
        ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
        last_start_times = hash_create("Logical replication table sync worker start times",
                                       256, &ctl, HASH_ELEM | HASH_BLOBS);
    }

    /*
     * Clean up the hash table when we're done with all tables (just to
     * release the bit of memory).
     */
    else if (!table_states && last_start_times)
    {
        hash_destroy(last_start_times);
        last_start_times = NULL;
    }

    /*
     * Process all tables that are being synchronized.
     */
    foreach(lc, table_states)
    {
        SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);

        if (rstate->state == SUBREL_STATE_SYNCDONE)
        {
            /*
             * Apply has caught up to the position where the table sync has
             * finished.  Mark the table as ready so that the apply will just
             * continue to replicate it normally.
             */
            if (current_lsn >= rstate->lsn)
            {
                rstate->state = SUBREL_STATE_READY;
                rstate->lsn = current_lsn;
                if (!started_tx)
                {
                    StartTransactionCommand();
                    started_tx = true;
                }
                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                        rstate->relid, rstate->state,
                                        rstate->lsn, true, false);
#ifdef __SUBSCRIPTION__
                /*
                 * Check if all COPY operations have completed on all tables, and if all completed,
                 * activate all the other parallel tbase-sub-subscriptions
                 */
                 if (MySubscription->is_all_actived == false &&
                      MySubscription->parallel_number > 1)
                 {
                     List * notready_list = NIL;

                    notready_list =  GetSubscriptionNotReadyRelations(MySubscription->oid);
                    if (notready_list == NIL)
                    {
                        /*
                         * all copy is done, activate the other parallel tbase-sub-subscriptions directly
                         */
                        ActiveAllParallelTbaseSubscriptions(current_lsn);
                        Assert(MySubscription->is_all_actived);
                        if (MySubscription->is_all_actived == false) abort();
                    }
                    else
                    {
                        /*
                         * do nothing if there is still COPY not completed
                         */
                        list_free_deep(notready_list);
                    }
                 }
#endif
            }
        }
        else
        {
            LogicalRepWorker *syncworker;

            /*
             * Look for a sync worker for this relation.
             */
            LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);

            syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                                rstate->relid, false);

            if (syncworker)
            {
                /* Found one, update our copy of its state */
                SpinLockAcquire(&syncworker->relmutex);
                rstate->state = syncworker->relstate;
                rstate->lsn = syncworker->relstate_lsn;
                if (rstate->state == SUBREL_STATE_SYNCWAIT)
                {
                    /*
                     * Sync worker is waiting for apply.  Tell sync worker it
                     * can catchup now.
                     */
                    syncworker->relstate = SUBREL_STATE_CATCHUP;
                    syncworker->relstate_lsn =
                        Max(syncworker->relstate_lsn, current_lsn);
                }
                SpinLockRelease(&syncworker->relmutex);

                /* If we told worker to catch up, wait for it. */
                if (rstate->state == SUBREL_STATE_SYNCWAIT)
                {
					bool is_syncdone = false;

                    /* Signal the sync worker, as it may be waiting for us. */
                    if (syncworker->proc)
                        logicalrep_worker_wakeup_ptr(syncworker);

                    /* Now safe to release the LWLock */
                    LWLockRelease(LogicalRepWorkerLock);

                    /*
                     * Enter busy loop and wait for synchronization worker to
                     * reach expected state (or die trying).
                     */
                    if (!started_tx)
                    {
                        StartTransactionCommand();
                        started_tx = true;
                    }

					is_syncdone = wait_for_relation_state_change(rstate->relid,
                                                   SUBREL_STATE_SYNCDONE);
#ifdef __SUBSCRIPTION__
					/* make parallel_index != 0 other sub_child worker's state about rstate->relid to SUBREL_STATE_SYNCDONE.
					 * because other sub_child worker's state is SUBREL_STATE_INIT after AlterSubscription_refresh,
					 * and other sub_child worker can't start sync woker to change the state
					 * */
					if (is_syncdone && MySubscription->parallel_number > 1 && MySubscription->parallel_index == 0)
					{
						ListCell   *lc_simple;
						ListCell   *notready_simple;

						/* get parallel sub_child_ids from tbase_subscription_parallel where sub_parent in
						 *  (select sub_parent from tbase_subscription_parallel where sub_child = MySubscription->oid)
						 *  */

						List *parallel_childids_list = GetTbaseSubscriptnParallelChild(MySubscription->oid);


						/* update pg_subscription_rel set srsubstate = SUBREL_STATE_SYNCDONE
						 * where srrelid = rstate->relid and srsubid in (${parallel_pid_list})
						 * */

						foreach (lc_simple, parallel_childids_list)
						{
							Oid sub_oid = lfirst_oid(lc_simple);
							if (sub_oid != MySubscription->oid)
							{
							 	List * notready_list = NIL;

								notready_list =  GetSubscriptionNotReadyRelations(sub_oid);
								if (notready_list != NIL)
								{
									foreach (notready_simple, notready_list)
									{
										SubscriptionRelState *sub_rel_state = (SubscriptionRelState *) lfirst(notready_simple);
										if (sub_rel_state->relid == rstate->relid)
										{
											/*
											 * just process the worker's state is <= SUBREL_STATE_SYNCDONE
											 */
											SetSubscriptionRelState(sub_oid,
													rstate->relid,
													SUBREL_STATE_SYNCDONE,
													rstate->lsn, true, false);
										}
									}
								}
							}
						}

						list_free(parallel_childids_list);
					}
#endif
                }
                else
                    LWLockRelease(LogicalRepWorkerLock);
            }
            else
            {
#ifdef __SUBSCRIPTION__
				/* parallel_index must is 0 ,if want to start sync worker*/
				if (MySubscription->parallel_index == 0)
				{
#endif
					/*
					 * If there is no sync worker for this table yet, count
					 * running sync workers for this subscription, while we have
					 * the lock.
					 */
					int			nsyncworkers =
							logicalrep_sync_worker_count(MyLogicalRepWorker->subid);

					/* Now safe to release the LWLock */
					LWLockRelease(LogicalRepWorkerLock);

					/*
					 * If there are free sync worker slot(s), start a new sync
					 * worker for the table.
					 */
					if (nsyncworkers < max_sync_workers_per_subscription)
					{
						TimestampTz now = GetCurrentTimestamp();
						struct tablesync_start_time_mapping *hentry;
						bool		found;

						hentry = hash_search(last_start_times, &rstate->relid,
                                         HASH_ENTER, &found);

						if (!found ||
								TimestampDifferenceExceeds(hentry->last_start_time, now,
                                                   wal_retrieve_retry_interval))
						{
							logicalrep_worker_launch(MyLogicalRepWorker->dbid,
                                                 MySubscription->oid,
                                                 MySubscription->name,
                                                 MyLogicalRepWorker->userid,
                                                 rstate->relid);
							hentry->last_start_time = now;
						}
                    }
#ifdef __SUBSCRIPTION__
                }
				else
					LWLockRelease(LogicalRepWorkerLock);
#endif
            }
        }
    }

    if (started_tx)
    {
        CommitTransactionCommand();
        pgstat_report_stat(false);
    }
}

/*
 * Process possible state change(s) of tables that are being synchronized.
 */
void
process_syncing_tables(XLogRecPtr current_lsn)
{
    if (am_tablesync_worker())
        process_syncing_tables_for_sync(current_lsn);
    else
        process_syncing_tables_for_apply(current_lsn);
}

/*
 * Create list of columns for COPY based on logical relation mapping.
 */
static List *
make_copy_attnamelist(LogicalRepRelMapEntry *rel)
{
    List       *attnamelist = NIL;
    int            i;

    for (i = 0; i < rel->remoterel.natts; i++)
    {
        attnamelist = lappend(attnamelist,
                              makeString(rel->remoterel.attnames[i]));
    }


    return attnamelist;
}

/*
 * Data source callback for the COPY FROM, which reads from the remote
 * connection and passes the data back to our local COPY.
 */
static int
copy_read_data(void *outbuf, int minread, int maxread)
{// #lizard forgives
    int            bytesread = 0;
    int            avail;

    /* If there are some leftover data from previous read, use it. */
    avail = copybuf->len - copybuf->cursor;
    if (avail)
    {
        if (avail > maxread)
            avail = maxread;
        memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
        copybuf->cursor += avail;
        maxread -= avail;
        bytesread += avail;
    }

    while (maxread > 0 && bytesread < minread)
    {
        pgsocket    fd = PGINVALID_SOCKET;
        int            rc;
        int            len;
        char       *buf = NULL;

        for (;;)
        {
            /* Try read the data. */
            len = walrcv_receive(wrconn, &buf, &fd);

            CHECK_FOR_INTERRUPTS();

            if (len == 0)
                break;
            else if (len < 0)
                return bytesread;
            else
            {
                /* Process the data */
                copybuf->data = buf;
                copybuf->len = len;
                copybuf->cursor = 0;

                avail = copybuf->len - copybuf->cursor;
                if (avail > maxread)
                    avail = maxread;
                memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
                outbuf = (void *) ((char *) outbuf + avail);
                copybuf->cursor += avail;
                maxread -= avail;
                bytesread += avail;
            }

            if (maxread <= 0 || bytesread >= minread)
                return bytesread;
        }

        /*
         * Wait for more data or latch.
         */
        rc = WaitLatchOrSocket(MyLatch,
                               WL_SOCKET_READABLE | WL_LATCH_SET |
                               WL_TIMEOUT | WL_POSTMASTER_DEATH,
                               fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);

        /* Emergency bailout if postmaster has died */
        if (rc & WL_POSTMASTER_DEATH)
            proc_exit(1);

        ResetLatch(MyLatch);
    }

    return bytesread;
}


/*
 * Get information about remote relation in similar fashion the RELATION
 * message provides during replication.
 */
static void
fetch_remote_table_info(char *nspname, char *relname,
                        LogicalRepRelation *lrel)
{
    WalRcvExecResult *res;
    StringInfoData cmd;
    TupleTableSlot *slot;
    Oid            tableRow[2] = {OIDOID, CHAROID};
    Oid            attrRow[5] = {TEXTOID, OIDOID, INT4OID, BOOLOID, INT4OID};
    bool        isnull;
    int            natt;

    lrel->nspname = nspname;
    lrel->relname = relname;

    /* First fetch Oid and replica identity. */
    initStringInfo(&cmd);
    appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
                     "  FROM pg_catalog.pg_class c"
                     "  INNER JOIN pg_catalog.pg_namespace n"
                     "        ON (c.relnamespace = n.oid)"
                     " WHERE n.nspname = %s"
                     "   AND c.relname = %s"
                     "   AND c.relkind = 'r'",
                     quote_literal_cstr(nspname),
                     quote_literal_cstr(relname));
    res = walrcv_exec(wrconn, cmd.data, 2, tableRow);

    if (res->status != WALRCV_OK_TUPLES)
        ereport(ERROR,
                (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
                        nspname, relname, res->err)));

    slot = MakeSingleTupleTableSlot(res->tupledesc);
    if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
        ereport(ERROR,
                (errmsg("table \"%s.%s\" not found on publisher",
                        nspname, relname)));

    lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
    Assert(!isnull);
    lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
    Assert(!isnull);

    ExecDropSingleTupleTableSlot(slot);
    walrcv_clear_result(res);

    /* Now fetch columns. */
    resetStringInfo(&cmd);
    appendStringInfo(&cmd,
                     "SELECT a.attname,"
                     "       a.atttypid,"
                     "       a.atttypmod,"
                     "       a.attnum = ANY(i.indkey)"
                     "  FROM pg_catalog.pg_attribute a"
                     "  LEFT JOIN pg_catalog.pg_index i"
                     "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
                     " WHERE a.attnum > 0::pg_catalog.int2"
                     "   AND NOT a.attisdropped"
                     "   AND a.attrelid = %u"
                     " ORDER BY a.attnum",
                     lrel->remoteid, lrel->remoteid);
    res = walrcv_exec(wrconn, cmd.data, 5, attrRow);

    if (res->status != WALRCV_OK_TUPLES)
        ereport(ERROR,
                (errmsg("could not fetch table info for table \"%s.%s\": %s",
                        nspname, relname, res->err)));

    /* We don't know the number of rows coming, so allocate enough space. */
    lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
    lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
    lrel->attkeys = NULL;

    natt = 0;
    slot = MakeSingleTupleTableSlot(res->tupledesc);
    while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    {
        lrel->attnames[natt] =
            TextDatumGetCString(slot_getattr(slot, 1, &isnull));
        Assert(!isnull);
        lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
        Assert(!isnull);
        if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
            lrel->attkeys = bms_add_member(lrel->attkeys, natt);

        /* Should never happen. */
        if (++natt >= MaxTupleAttributeNumber)
            elog(ERROR, "too many columns in remote table \"%s.%s\"",
                 nspname, relname);

        ExecClearTuple(slot);
    }
    ExecDropSingleTupleTableSlot(slot);

    lrel->natts = natt;

    walrcv_clear_result(res);
    pfree(cmd.data);
}

/*
 * Copy existing data of a table from publisher.
 *
 * Caller is responsible for locking the local relation.
 */
#ifdef __STORAGE_SCALABLE__
static uint64
copy_table(Relation rel, List *shards)
#else
static void
copy_table(Relation rel)
#endif
{// #lizard forgives
    LogicalRepRelMapEntry *relmapentry;
    LogicalRepRelation lrel;
    WalRcvExecResult *res;
    StringInfoData cmd;
    CopyState    cstate;
    List       *attnamelist;
    ParseState *pstate;
#ifdef __STORAGE_SCALABLE__
    uint64 nCopyIn = 0;
#endif
    /* Get the publisher relation info. */
    fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
                            RelationGetRelationName(rel), &lrel);

    /* Put the relation into relmap. */
    logicalrep_relmap_update(&lrel);

    /* Map the publisher relation to local one. */
    relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
    Assert(rel == relmapentry->localrel);

    /* Start copy on the publisher. */
    initStringInfo(&cmd);
#ifdef __STORAGE_SCALABLE__
    /* copy table with shards */
    if (shards)
    {
        bool     first;
        ListCell *cell;
        
        appendStringInfo(&cmd, "COPY %s sharding (",
                 quote_qualified_identifier(lrel.nspname, lrel.relname));
        first = true;
        foreach(cell, shards)
        {
            if (!first)
            {
                appendStringInfo(&cmd, ",%d", lfirst_int(cell));
            }
            else
            {
                appendStringInfo(&cmd, "%d", lfirst_int(cell));
                first = false;
            }
        }
        
        appendStringInfoString(&cmd, ") TO STDOUT");
    }
    else
    {
#endif
    appendStringInfo(&cmd, "COPY %s TO STDOUT",
                     quote_qualified_identifier(lrel.nspname, lrel.relname));
#ifdef __STORAGE_SCALABLE__
    }
#endif
    res = walrcv_exec(wrconn, cmd.data, 0, NULL);
    pfree(cmd.data);
    if (res->status != WALRCV_OK_COPY_OUT)
        ereport(ERROR,
                (errmsg("could not start initial contents copy for table \"%s.%s\": %s",
                        lrel.nspname, lrel.relname, res->err)));
    walrcv_clear_result(res);

    copybuf = makeStringInfo();

    pstate = make_parsestate(NULL);
    addRangeTableEntryForRelation(pstate, rel, NULL, false, false);

    attnamelist = make_copy_attnamelist(relmapentry);
    cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);

    /* Do the copy */
#ifdef __STORAGE_SCALABLE__
    nCopyIn = CopyFrom(cstate);
#else
    (void) CopyFrom(cstate);
#endif

#ifdef __SUBSCRIPTION__
    if (IS_PGXC_COORDINATOR)
        EndCopyFrom(cstate);
#endif

    logicalrep_rel_close(relmapentry, NoLock);

#ifdef __STORAGE_SCALABLE__
    return nCopyIn;
#endif
}

/*
 * Start syncing the table in the sync worker.
 *
 * The returned slot name is palloc'ed in current memory context.
 */
char *
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{// #lizard forgives
    char       *slotname;
    char       *err;
    char        relstate;
    XLogRecPtr    relstate_lsn;

    /* Check the state of the table synchronization. */
    StartTransactionCommand();
    relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                                       MyLogicalRepWorker->relid,
                                       &relstate_lsn, true);
    CommitTransactionCommand();

    SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    MyLogicalRepWorker->relstate = relstate;
    MyLogicalRepWorker->relstate_lsn = relstate_lsn;
    SpinLockRelease(&MyLogicalRepWorker->relmutex);

    /*
     * To build a slot name for the sync work, we are limited to NAMEDATALEN -
     * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
     * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
     * NAMEDATALEN on the remote that matters, but this scheme will also work
     * reasonably if that is different.)
     */
    StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");    /* for sanity */
    slotname = psprintf("%.*s_%u_sync_%u",
                        NAMEDATALEN - 28,
                        MySubscription->slotname,
                        MySubscription->oid,
                        MyLogicalRepWorker->relid);

    /*
     * Here we use the slot name instead of the subscription name as the
     * application_name, so that it is different from the main apply worker,
     * so that synchronous replication can distinguish them.
     */
    wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
    if (wrconn == NULL)
        ereport(ERROR,
                (errmsg("could not connect to the publisher: %s", err)));

    switch (MyLogicalRepWorker->relstate)
    {
        case SUBREL_STATE_INIT:
        case SUBREL_STATE_DATASYNC:
            {
                Relation    rel;
                WalRcvExecResult *res;
				bool origin_startpos_is_greater = true;

#ifdef __STORAGE_SCALABLE__
                List        *shards = NULL;
                uint64      nTups_copy = 0;
#endif

                SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
                MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
                SpinLockRelease(&MyLogicalRepWorker->relmutex);

                /* Update the state and make it visible to others. */
                StartTransactionCommand();
                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                        MyLogicalRepWorker->relid,
                                        MyLogicalRepWorker->relstate,
                                        MyLogicalRepWorker->relstate_lsn,
                                        true, false);
                CommitTransactionCommand();
                pgstat_report_stat(false);

                /*
                 * We want to do the table data sync in a single transaction.
                 */
                StartTransactionCommand();

#ifdef __STORAGE_SCALABLE__
                /* get shards of relation */
                shards = GetSubscriptionRelShards(MyLogicalRepWorker->subid, 
                                                  MyLogicalRepWorker->relid);
#endif

                /*
                 * Use a standard write lock here. It might be better to
                 * disallow access to the table while it's being synchronized.
                 * But we don't want to block the main apply process from
                 * working and it has to open the relation in RowExclusiveLock
                 * when remapping remote relation id to local one.
                 */
                rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock);

                /*
                 * Create a temporary slot for the sync process. We do this
                 * inside the transaction so that we can use the snapshot made
                 * by the slot to get existing data.
                 */
                res = walrcv_exec(wrconn,
                                  "BEGIN READ ONLY ISOLATION LEVEL "
                                  "REPEATABLE READ", 0, NULL);
                if (res->status != WALRCV_OK_COMMAND)
                    ereport(ERROR,
                            (errmsg("table copy could not start transaction on publisher"),
                             errdetail("The error was: %s", res->err)));
                walrcv_clear_result(res);

                /*
                 * Create new temporary logical decoding slot.
                 *
                 * We'll use slot for data copy so make sure the snapshot is
                 * used for the transaction; that way the COPY will get data
                 * that is consistent with the lsn used by the slot to start
                 * decoding.
                 */
#ifdef __STORAGE_SCALABLE__
				if (shards)
				{
					walrcv_create_slot(wrconn, slotname, true,
									   CRS_USE_SNAPSHOT, origin_startpos,
									   MySubscription->name, MySubscription->oid,
									   get_namespace_name(RelationGetNamespace(rel)),
									   RelationGetRelationName(rel));
				}
				else
				{
					walrcv_create_slot(wrconn, slotname, true,
									   CRS_USE_SNAPSHOT, origin_startpos,
									   NULL, InvalidOid, NULL, NULL);
				}

                UpdateSubTableStatistics(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid,
                                         0, 0, 0, 0, 0, STATE_DATACOPY, true);
#else
                walrcv_create_slot(wrconn, slotname, true,
                                   CRS_USE_SNAPSHOT, origin_startpos);
#endif
                PushActiveSnapshot(GetTransactionSnapshot());
#ifdef __STORAGE_SCALABLE__
                nTups_copy = copy_table(rel, shards);
#else
                copy_table(rel);
#endif
                PopActiveSnapshot();

                res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
                if (res->status != WALRCV_OK_COMMAND)
                    ereport(ERROR,
                            (errmsg("table copy could not finish transaction on publisher"),
                             errdetail("The error was: %s", res->err)));
                walrcv_clear_result(res);

                heap_close(rel, NoLock);

                /* Make the copy visible. */
                CommandCounterIncrement();

#ifdef __STORAGE_SCALABLE__
                /* record statistic data */
                UpdateSubTableStatistics(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid,
                                         nTups_copy, 0, 0, 0, 0, STATE_COPYDONE, false);
                UpdateSubStatistics(MySubscription->name, nTups_copy, 0, 0, 0, 0, false);

#endif

                /*
                 * We are done with the initial data synchronization, update
                 * the state.
                 */
                SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
                MyLogicalRepWorker->relstate_lsn = *origin_startpos;
                SpinLockRelease(&MyLogicalRepWorker->relmutex);

                /* Wait for main apply worker to tell us to catchup. */
                wait_for_worker_state_change(SUBREL_STATE_CATCHUP);


#ifdef __SUBSCRIPTION__

	           if (MySubscription->is_all_actived == true &&
				    MySubscription->parallel_number > 1)
	           {
	        	   ListCell   *lc_simple;

	        	   /*get parallel_index != 0 other sub_child worker's MyLogicalRepWorker*/
	        	   List *parallel_sub_workers_list = GetTbaseSubscriptnParallelWorker(MySubscription->oid);

	        	   foreach (lc_simple, parallel_sub_workers_list)
	        	   {
	        		   LogicalRepWorker *sub_worker = (LogicalRepWorker *) lfirst(lc_simple);
	        		   SpinLockAcquire(&sub_worker->relmutex);
	        		   if (*origin_startpos < sub_worker->reply_lsn)
	        		   {
	        			   /* must greater than other parallel worker's lsn, else return */
	        			   SpinLockRelease(&sub_worker->relmutex);
	        			   origin_startpos_is_greater = false;
	        			   break;
	        		   }
	        		   SpinLockRelease(&sub_worker->relmutex);
	        	   }

	        	   list_free(parallel_sub_workers_list);

	           }

#endif
                /*----------
                 * There are now two possible states here:
                 * a) Sync is behind the apply.  If that's the case we need to
                 *      catch up with it by consuming the logical replication
                 *      stream up to the relstate_lsn.  For that, we exit this
                 *      function and continue in ApplyWorkerMain().
                 * b) Sync is caught up with the apply.  So it can just set
                 *      the state to SYNCDONE and finish.
                 *----------
                 */
				if (origin_startpos_is_greater == true &&
						*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
                {
                    /*
                     * Update the new state in catalog.  No need to bother
                     * with the shmem state as we are exiting for good.
                     */
                    SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                            MyLogicalRepWorker->relid,
                                            SUBREL_STATE_SYNCDONE,
                                            *origin_startpos,
                                            true, false);
#ifdef __STORAGE_SCALABLE__
                    UpdateSubTableStatistics(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid,
                         0, 0, 0, 0, 0, STATE_APPLY, false);
#endif
                    finish_sync_worker();
                }
                break;
            }
        case SUBREL_STATE_SYNCDONE:
        case SUBREL_STATE_READY:
        case SUBREL_STATE_UNKNOWN:

            /*
             * Nothing to do here but finish.  (UNKNOWN means the relation was
             * removed from pg_subscription_rel before the sync worker could
             * start.)
             */
            finish_sync_worker();
            break;
        default:
            elog(ERROR, "unknown relation state \"%c\"",
                 MyLogicalRepWorker->relstate);
    }

#ifdef __STORAGE_SCALABLE__
    UpdateSubTableStatistics(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid,
         0, 0, 0, 0, 0, STATE_APPLY, false);
#endif

    return slotname;
}
